<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
  "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">

<html xmlns="http://www.w3.org/1999/xhtml">
<head><title></title>
<link href="../style/ebook.css" type="text/css" rel="stylesheet">
</head>
<body>
<h1>Lineage</h1>
<div class="admonition note">
<p class="first admonition-title">Note</p>
<p class="last">Lineage support is very experimental and subject to change.</p>
</div>
<p>Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having
audit trails and data governance, but also debugging of data flows.</p>
<p>Airflow tracks data by means of inlets and outlets of the tasks. Let&#x2019;s work from an example and see how it
works.</p>
<div class="code python highlight-default notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">airflow.operators.bash_operator</span> <span class="k">import</span> <span class="n">BashOperator</span>
<span class="kn">from</span> <span class="nn">airflow.operators.dummy_operator</span> <span class="k">import</span> <span class="n">DummyOperator</span>
<span class="kn">from</span> <span class="nn">airflow.lineage.datasets</span> <span class="k">import</span> <span class="n">File</span>
<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">DAG</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">timedelta</span>

<span class="n">FILE_CATEGORIES</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;CAT1&quot;</span><span class="p">,</span> <span class="s2">&quot;CAT2&quot;</span><span class="p">,</span> <span class="s2">&quot;CAT3&quot;</span><span class="p">]</span>

<span class="n">args</span> <span class="o">=</span> <span class="p">{</span>
    <span class="s1">&apos;owner&apos;</span><span class="p">:</span> <span class="s1">&apos;airflow&apos;</span><span class="p">,</span>
    <span class="s1">&apos;start_date&apos;</span><span class="p">:</span> <span class="n">airflow</span><span class="o">.</span><span class="n">utils</span><span class="o">.</span><span class="n">dates</span><span class="o">.</span><span class="n">days_ago</span><span class="p">(</span><span class="mi">2</span><span class="p">)</span>
<span class="p">}</span>

<span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span>
    <span class="n">dag_id</span><span class="o">=</span><span class="s1">&apos;example_lineage&apos;</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">args</span><span class="p">,</span>
    <span class="n">schedule_interval</span><span class="o">=</span><span class="s1">&apos;0 0 * * *&apos;</span><span class="p">,</span>
    <span class="n">dagrun_timeout</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">minutes</span><span class="o">=</span><span class="mi">60</span><span class="p">))</span>

<span class="n">f_final</span> <span class="o">=</span> <span class="n">File</span><span class="p">(</span><span class="s2">&quot;/tmp/final&quot;</span><span class="p">)</span>
<span class="n">run_this_last</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">&apos;run_this_last&apos;</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span>
    <span class="n">inlets</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;auto&quot;</span><span class="p">:</span> <span class="kc">True</span><span class="p">},</span>
    <span class="n">outlets</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;datasets&quot;</span><span class="p">:</span> <span class="p">[</span><span class="n">f_final</span><span class="p">,]})</span>

<span class="n">f_in</span> <span class="o">=</span> <span class="n">File</span><span class="p">(</span><span class="s2">&quot;/tmp/whole_directory/&quot;</span><span class="p">)</span>
<span class="n">outlets</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">file</span> <span class="ow">in</span> <span class="n">FILE_CATEGORIES</span><span class="p">:</span>
    <span class="n">f_out</span> <span class="o">=</span> <span class="n">File</span><span class="p">(</span><span class="s2">&quot;/tmp/</span><span class="si">{}</span><span class="s2">/{{{{ execution_date }}}}&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">file</span><span class="p">))</span>
    <span class="n">outlets</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">f_out</span><span class="p">)</span>
<span class="n">run_this</span> <span class="o">=</span> <span class="n">BashOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&apos;run_me_first&apos;</span><span class="p">,</span> <span class="n">bash_command</span><span class="o">=</span><span class="s1">&apos;echo 1&apos;</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span>
    <span class="n">inlets</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;datasets&quot;</span><span class="p">:</span> <span class="p">[</span><span class="n">f_in</span><span class="p">,]},</span>
    <span class="n">outlets</span><span class="o">=</span><span class="p">{</span><span class="s2">&quot;datasets&quot;</span><span class="p">:</span> <span class="n">outlets</span><span class="p">}</span>
    <span class="p">)</span>
<span class="n">run_this</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">(</span><span class="n">run_this_last</span><span class="p">)</span>
</pre>
</div>
</div>
<p>Tasks take the parameters <cite>inlets</cite> and <cite>outlets</cite>. Inlets can be manually defined by a list of dataset <cite>{&#x201C;datasets&#x201D;:
[dataset1, dataset2]}</cite> or can be configured to look for outlets from upstream tasks <cite>{&#x201C;task_ids&#x201D;: [&#x201C;task_id1&#x201D;, &#x201C;task_id2&#x201D;]}</cite>
or can be configured to pick up outlets from direct upstream tasks <cite>{&#x201C;auto&#x201D;: True}</cite> or a combination of them. Outlets
are defined as list of dataset <cite>{&#x201C;datasets&#x201D;: [dataset1, dataset2]}</cite>. Any fields for the dataset are templated with
the context when the task is being executed.</p>
<div class="admonition note">
<p class="first admonition-title">Note</p>
<p class="last">Operators can add inlets and outlets automatically if the operator supports it.</p>
</div>
<p>In the example DAG task <cite>run_me_first</cite> is a BashOperator that takes 3 inlets: <cite>CAT1</cite>, <cite>CAT2</cite>, <cite>CAT3</cite>, that are
generated from a list. Note that <cite>execution_date</cite> is a templated field and will be rendered when the task is running.</p>
<div class="admonition note">
<p class="first admonition-title">Note</p>
<p class="last">Behind the scenes Airflow prepares the lineage metadata as part of the <cite>pre_execute</cite> method of a task. When the task
has finished execution <cite>post_execute</cite> is called and lineage metadata is pushed into XCOM. Thus if you are creating
your own operators that override this method make sure to decorate your method with <cite>prepare_lineage</cite> and <cite>apply_lineage</cite>
respectively.</p>
</div>
<div class="section" id="apache-atlas">
<h2 class="sigil_not_in_toc">Apache Atlas</h2>
<p>Airflow can send its lineage metadata to Apache Atlas. You need to enable the <cite>atlas</cite> backend and configure it
properly, e.g. in your <cite>airflow.cfg</cite>:</p>
<div class="code python highlight-default notranslate"><div class="highlight"><pre><span></span><span class="p">[</span><span class="n">lineage</span><span class="p">]</span>
<span class="n">backend</span> <span class="o">=</span> <span class="n">airflow</span><span class="o">.</span><span class="n">lineage</span><span class="o">.</span><span class="n">backend</span><span class="o">.</span><span class="n">atlas</span>

<span class="p">[</span><span class="n">atlas</span><span class="p">]</span>
<span class="n">username</span> <span class="o">=</span> <span class="n">my_username</span>
<span class="n">password</span> <span class="o">=</span> <span class="n">my_password</span>
<span class="n">host</span> <span class="o">=</span> <span class="n">host</span>
<span class="n">port</span> <span class="o">=</span> <span class="mi">21000</span>
</pre>
</div>
</div>
<p>Please make sure to have the <cite>atlasclient</cite> package installed.</p>
</div>
</body>
</html>